{
  "nbformat": 4,
  "nbformat_minor": 0,
  "metadata": {
    "colab": {
      "provenance": []
    },
    "kernelspec": {
      "name": "python3",
      "display_name": "Python 3"
    },
    "language_info": {
      "name": "python"
    }
  },
  "cells": [
    {
      "cell_type": "code",
      "execution_count": 1,
      "metadata": {
        "colab": {
          "base_uri": "https://localhost:8080/"
        },
        "id": "ki9N8iqRxu0I",
        "outputId": "fd1628a5-d2a4-44a4-89b4-31151d21c8f3"
      },
      "outputs": [
        {
          "output_type": "stream",
          "name": "stdout",
          "text": [
            "Collecting pocketflow\n",
            "  Downloading pocketflow-0.0.1-py3-none-any.whl.metadata (270 bytes)\n",
            "Downloading pocketflow-0.0.1-py3-none-any.whl (3.3 kB)\n",
            "Installing collected packages: pocketflow\n",
            "Successfully installed pocketflow-0.0.1\n"
          ]
        }
      ],
      "source": [
        "pip install pocketflow"
      ]
    },
    {
      "cell_type": "code",
      "source": [
        "import asyncio\n",
        "import time\n",
        "\n",
        "from pocketflow import AsyncBatchNode, AsyncParallelBatchNode, AsyncFlow\n",
        "\n",
        "####################################\n",
        "# Dummy async function (1s delay)\n",
        "####################################\n",
        "async def dummy_llm_summarize(text):\n",
        "    \"\"\"Simulates an async LLM call that takes 1 second.\"\"\"\n",
        "    await asyncio.sleep(1)\n",
        "    return f\"Summarized({len(text)} chars)\"\n",
        "\n",
        "###############################################\n",
        "# 1) AsyncBatchNode (sequential) version\n",
        "###############################################\n",
        "\n",
        "class SummariesAsyncNode(AsyncBatchNode):\n",
        "    \"\"\"\n",
        "    Processes items sequentially in an async manner.\n",
        "    The next item won't start until the previous item has finished.\n",
        "    \"\"\"\n",
        "\n",
        "    async def prep_async(self, shared):\n",
        "        # Return a list of items to process.\n",
        "        # Each item is (filename, content).\n",
        "        return list(shared[\"data\"].items())\n",
        "\n",
        "    async def exec_async(self, item):\n",
        "        filename, content = item\n",
        "        print(f\"[Sequential] Summarizing {filename}...\")\n",
        "        summary = await dummy_llm_summarize(content)\n",
        "        return (filename, summary)\n",
        "\n",
        "    async def post_async(self, shared, prep_res, exec_res_list):\n",
        "        # exec_res_list is a list of (filename, summary)\n",
        "        shared[\"sequential_summaries\"] = dict(exec_res_list)\n",
        "        return \"done_sequential\"\n",
        "\n",
        "###############################################\n",
        "# 2) AsyncParallelBatchNode (concurrent) version\n",
        "###############################################\n",
        "\n",
        "class SummariesAsyncParallelNode(AsyncParallelBatchNode):\n",
        "    \"\"\"\n",
        "    Processes items in parallel. Many LLM calls start at once.\n",
        "    \"\"\"\n",
        "\n",
        "    async def prep_async(self, shared):\n",
        "        return list(shared[\"data\"].items())\n",
        "\n",
        "    async def exec_async(self, item):\n",
        "        filename, content = item\n",
        "        print(f\"[Parallel] Summarizing {filename}...\")\n",
        "        summary = await dummy_llm_summarize(content)\n",
        "        return (filename, summary)\n",
        "\n",
        "    async def post_async(self, shared, prep_res, exec_res_list):\n",
        "        shared[\"parallel_summaries\"] = dict(exec_res_list)\n",
        "        return \"done_parallel\"\n",
        "\n",
        "###############################################\n",
        "# Demo comparing the two approaches\n",
        "###############################################\n",
        "\n",
        "async def main():\n",
        "    # We'll use the same data for both flows\n",
        "    shared_data = {\n",
        "        \"data\": {\n",
        "            \"file1.txt\": \"Hello world 1\",\n",
        "            \"file2.txt\": \"Hello world 2\",\n",
        "            \"file3.txt\": \"Hello world 3\",\n",
        "        }\n",
        "    }\n",
        "\n",
        "    # 1) Run the sequential version\n",
        "    seq_node = SummariesAsyncNode()\n",
        "    seq_flow = AsyncFlow(start=seq_node)\n",
        "\n",
        "    print(\"\\n=== Running Sequential (AsyncBatchNode) ===\")\n",
        "    t0 = time.time()\n",
        "    await seq_flow.run_async(shared_data)\n",
        "    t1 = time.time()\n",
        "\n",
        "    # 2) Run the parallel version\n",
        "    par_node = SummariesAsyncParallelNode()\n",
        "    par_flow = AsyncFlow(start=par_node)\n",
        "\n",
        "    print(\"\\n=== Running Parallel (AsyncParallelBatchNode) ===\")\n",
        "    t2 = time.time()\n",
        "    await par_flow.run_async(shared_data)\n",
        "    t3 = time.time()\n",
        "\n",
        "    # Show times\n",
        "    print(\"\\n--- Results ---\")\n",
        "    print(f\"Sequential Summaries: {shared_data.get('sequential_summaries')}\")\n",
        "    print(f\"Parallel Summaries:   {shared_data.get('parallel_summaries')}\")\n",
        "\n",
        "    print(f\"Sequential took: {t1 - t0:.2f} seconds\")\n",
        "    print(f\"Parallel took:   {t3 - t2:.2f} seconds\")\n"
      ],
      "metadata": {
        "id": "mHZpGv8txy4L"
      },
      "execution_count": 3,
      "outputs": []
    },
    {
      "cell_type": "code",
      "source": [
        "# if in a py project\n",
        "# asyncio.run(main())\n",
        "await main()"
      ],
      "metadata": {
        "colab": {
          "base_uri": "https://localhost:8080/"
        },
        "id": "zfnhW3f-0W6o",
        "outputId": "3737e2e5-5cae-4c6b-a894-e880cf338d1f"
      },
      "execution_count": 5,
      "outputs": [
        {
          "output_type": "stream",
          "name": "stdout",
          "text": [
            "\n",
            "=== Running Sequential (AsyncBatchNode) ===\n",
            "[Sequential] Summarizing file1.txt...\n",
            "[Sequential] Summarizing file2.txt...\n",
            "[Sequential] Summarizing file3.txt...\n",
            "\n",
            "=== Running Parallel (AsyncParallelBatchNode) ===\n",
            "[Parallel] Summarizing file1.txt...\n",
            "[Parallel] Summarizing file2.txt...\n",
            "[Parallel] Summarizing file3.txt...\n",
            "\n",
            "--- Results ---\n",
            "Sequential Summaries: {'file1.txt': 'Summarized(13 chars)', 'file2.txt': 'Summarized(13 chars)', 'file3.txt': 'Summarized(13 chars)'}\n",
            "Parallel Summaries:   {'file1.txt': 'Summarized(13 chars)', 'file2.txt': 'Summarized(13 chars)', 'file3.txt': 'Summarized(13 chars)'}\n",
            "Sequential took: 3.00 seconds\n",
            "Parallel took:   1.00 seconds\n"
          ]
        }
      ]
    },
    {
      "cell_type": "code",
      "source": [],
      "metadata": {
        "id": "ystwa74D0Z_k"
      },
      "execution_count": null,
      "outputs": []
    }
  ]
}